package com.example.zookeeper.config.kafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.springframework.util.ObjectUtils;

import java.util.Map;
import java.util.Random;

/**
 * 自定义发送分区策略
 *
 * Created by yanweiwen on 2020-09-13.
 */
public class SendPartitionStrategy implements Partitioner {

    /**
     * topic分区数量
     */
    private static final int partitions = 3;

    /**
     * 分区规则
     *
     * @param topic 主题
     * @param key 消息key
     * @param keyBytes 消息key byte
     * @param value 消息内容
     * @param valueBytes 消息内容byte
     * @param cluster 集群
     * @return 返回的int值为分区序号
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (ObjectUtils.isEmpty(key)) {
            return new Random().nextInt(partitions);
        }
        return Math.abs(key.hashCode())%3;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }

}
